package redisext

import (
	"context"
	"fmt"
	"math"
	"unsafe"

	"gitee.com/xfrm/middleware/xlog"
)

type BloomInterface interface {
	Add(context.Context, string) error
	Exists(context.Context, string) (bool, error)
	Del(context.Context) error

	// M 系列函数慎重使用。当错误率 0.01，每个 entry 存在 7 个 getbit 操作，entries 数量很多时，会占用 redis 大量时间，容易影响其他业务。有问题可以咨询"seawead服务治理" 群组。
	Mexists(context.Context, ...string) ([]bool, error)
	// M 系列函数慎重使用。当错误率 0.01，每个 entry 存在 7 个 getbit 操作，entries 数量很多时，会占用 redis 大量时间，容易影响其他业务。有问题可以咨询"seawead服务治理" 群组。
	Madd(context.Context, ...string) error
}

// 新建一个 redis bloom，其中参数:
//
//  namespace 一般传递服务名称， 如 api/reportapi，代码中可以使用 sbase.Servname()；
//  key， bloom 的名称，符合 redis 的 key 规范；
//  entries， 预估插入元素的数量；
//  errorRate, 允许的最大失败率;
//
//  entries 尽量预估准确，如果预估小了，插入元素数量太多，会导致错误率上升；如果预估大了，浪费 redis 资源。
//  ps. 不允许新建 key 相同，但是 entries/errorRate 不同的 bloom，如果想要变更 entries/errorRate, 先调用 Del 函数后重新创建。
func NewRedisBloom(namespace, key string, entries int64, errorRate float64) (BloomInterface, error) {
	var bloom RedisBloom
	if err := bloom.Init(namespace, key, entries, errorRate); err != nil {
		return nil, err
	}
	return &bloom, nil
}

const (
	MaxBloomValueBit = 512 * 1024 * 1024 * 8 // 512M

	BloomPrefix = "__bloom_prefix"

	BloomAddLua = `		
		local result ={} 
		for i = 1,#(ARGV) do 
   			result[i]= redis.call('setbit', KEYS[1], ARGV[i], 1) 
		end 
		return result`
	BloomExistsLua = `
		local result ={} 
		for i = 1,#(ARGV) do 
   			result[i]= redis.call('getbit', KEYS[1], ARGV[i]) 
		end 
		return result`
)

type RedisBloom struct {
	*RedisExt
	key string

	bits   int64
	hashes int

	addScript    *Script
	existsScript *Script
}

type bloomHash struct {
	A uint64
	B uint64
}

func (r *RedisBloom) Init(namespace, key string, entries int64, errorRate float64) error {
	if entries < 1 || errorRate <= 0 || errorRate >= 1.0 {
		return fmt.Errorf("redis bloom init err: param error")
	}
	if err := r.setHashesAndBits(entries, errorRate); err != nil {
		return err
	}

	r.key = key
	r.RedisExt = NewRedisExt(namespace, BloomPrefix)
	r.addScript = NewScript(BloomAddLua)
	r.existsScript = NewScript(BloomExistsLua)
	return nil
}

func (r *RedisBloom) Add(ctx context.Context, ele string) error {
	command := "redisBloom.Add"

	if err := r.madd(ctx, ele); err != nil {
		xlog.Warnf(ctx, "%s error, entry: %v, err: %v", command, ele, err)
		return err
	}
	return nil
}

func (r *RedisBloom) Madd(ctx context.Context, eles ...string) error {
	command := "redisBloom.Madd"

	if err := r.madd(ctx, eles...); err != nil {
		xlog.Warnf(ctx, "%s error, entries: %v, err: %v", command, eles, err)
		return err
	}
	return nil
}

func (r *RedisBloom) Exists(ctx context.Context, ele string) (bool, error) {
	command := "redisBloom.Exists"

	res, err := r.mexists(ctx, ele)
	if err != nil {
		xlog.Warnf(ctx, "%s error, entry: %s, err: %v", command, ele, err)
		return false, err
	}
	return res[0], nil
}

func (r *RedisBloom) Mexists(ctx context.Context, eles ...string) ([]bool, error) {
	command := "redisBloom.Mexists"

	res, err := r.mexists(ctx, eles...)
	if err != nil {
		xlog.Warnf(ctx, "%s error, entries: %v, err: %v", command, eles, err)
		return nil, err
	}
	return res, nil
}

func (r *RedisBloom) Del(ctx context.Context) error {
	cnt, err := r.RedisExt.Del(ctx, r.key)
	if err != nil {
		return err
	}
	if cnt == 0 {
		return fmt.Errorf("no exists bloom")
	}
	return nil
}

func (r *RedisBloom) madd(ctx context.Context, eles ...string) error {
	if len(eles) == 0 {
		return fmt.Errorf("param is empty")
	}

	offsets := r.getAllOffsetsFromHash(r.bloomCalcMultiHash64(eles...))
	_, err := r.Eval(ctx, r.addScript, []string{r.key}, toInterfaceArray(offsets)...)
	if err != nil {
		return fmt.Errorf("eval err: %v", err)
	}
	return nil
}

func (r *RedisBloom) mexists(ctx context.Context, eles ...string) ([]bool, error) {
	if len(eles) == 0 {
		return nil, fmt.Errorf("param is empty")
	}

	offsets := r.getAllOffsetsFromHash(r.bloomCalcMultiHash64(eles...))
	res, err := r.Eval(ctx, r.existsScript, []string{r.key}, toInterfaceArray(offsets)...)
	if err != nil {
		return nil, fmt.Errorf("eval err: %v", err)
	}

	bits, err := r.parseEvalResult(res, len(eles))
	if err != nil {
		return nil, fmt.Errorf("parse eval result err: %v", err)
	}

	result := make([]bool, len(eles))
	for i := range eles {
		result[i] = r.checkEntryExist(bits[i*r.hashes : (i+1)*r.hashes])
	}
	return result, nil
}

func (r *RedisBloom) checkEntryExist(bits []int64) bool {
	for _, bit := range bits {
		if bit == 0 {
			return false
		}
	}
	return true
}

// interface -> []interface{} -> []int64
func (r *RedisBloom) parseEvalResult(res interface{}, entriesNum int) ([]int64, error) {
	interfaceArr, ok := res.([]interface{})
	if !ok {
		return nil, fmt.Errorf("result not []interface{}, type: %T, content: %v", res, res)
	}
	if len(interfaceArr) != r.hashes*entriesNum {
		return nil, fmt.Errorf("result len error, hashes_len: %d, res_length: %d, content: %v", r.hashes, len(interfaceArr), res)
	}
	bits, ok := toInt64Array(interfaceArr)
	if !ok {
		return nil, fmt.Errorf("result to []int64 fail, type: %T, content: %v", interfaceArr[0], res)
	}
	return bits, nil
}

//  参考 http://en.wikipedia.org/wiki/Bloom_filter
//  bits = (entries * ln(error)) / ln(2)^2
//  hashes = bpe * ln(2)
func (r *RedisBloom) setHashesAndBits(entries int64, errorRate float64) error {
	bpe := calc_bpe(errorRate)

	r.bits = int64(float64(entries) * bpe)
	if r.bits > MaxBloomValueBit {
		return fmt.Errorf("bloom entries is too big")
	}

	r.hashes = int(math.Ceil(0.693147180559945 * bpe)) // ln(2)
	return nil
}

func (b *RedisBloom) bloomCalcHash64(ele string) *bloomHash {
	var res bloomHash
	res.A = murmurHash64A(ele, 0xc6a4a7935bd1e995)
	res.B = murmurHash64A(ele, res.A)
	return &res
}

func (b *RedisBloom) bloomCalcMultiHash64(eles ...string) []*bloomHash {
	result := make([]*bloomHash, len(eles))
	for i, entry := range eles {
		result[i] = b.bloomCalcHash64(entry)
	}
	return result
}

func (b *RedisBloom) getOffsetsFromHash(hash *bloomHash) []uint64 {
	var offsets []uint64
	for i := uint64(0); i < uint64(b.hashes); i++ {
		offset := (hash.A + i*hash.B) % uint64(b.bits)
		offsets = append(offsets, offset)
	}
	return offsets
}

func (b *RedisBloom) getAllOffsetsFromHash(hashs []*bloomHash) []uint64 {
	result := make([]uint64, 0, len(hashs)*b.hashes)
	for _, hash := range hashs {
		result = append(result, b.getOffsetsFromHash(hash)...)
	}
	return result
}

func toInterfaceArray(arr []uint64) []interface{} {
	result := make([]interface{}, len(arr))
	for i, ele := range arr {
		result[i] = ele
	}
	return result
}

func toInt64Array(arr []interface{}) ([]int64, bool) {
	result := make([]int64, len(arr))

	ok := false
	for i, ele := range arr {
		result[i], ok = ele.(int64)
		if !ok {
			return nil, false
		}
	}
	return result, true
}

func calc_bpe(errRate float64) float64 {
	const denom = 0.480453013918201 // ln(2)^2
	num := math.Log(errRate)

	return math.Abs(num / denom)
}

func murmurHash64A(ele string, seed uint64) uint64 {
	const m uint64 = 0xc6a4a7935bd1e995
	const r = 47
	eleLen := len(ele)

	h := seed ^ (uint64(eleLen) * m)

	data := []byte(ele)
	nblocks := len(ele) / 8
	for i := 0; i < nblocks; i++ {
		k := *(*uint64)(unsafe.Pointer(&data[i*8]))

		k *= m
		k ^= k >> r
		k *= m

		h ^= k
		h *= m
	}

	data2 := ele[eleLen/8*8:]
	switch eleLen & 7 {
	case 7:
		h ^= (uint64(data2[6])) << 48
		fallthrough
	case 6:
		h ^= (uint64(data2[5])) << 40
		fallthrough
	case 5:
		h ^= (uint64(data2[4])) << 32
		fallthrough
	case 4:
		h ^= (uint64(data2[3])) << 24
		fallthrough
	case 3:
		h ^= (uint64(data2[2])) << 16
		fallthrough
	case 2:
		h ^= (uint64(data2[1])) << 8
		fallthrough
	case 1:
		h ^= uint64(data2[0])
		h *= m
	}

	h ^= h >> r
	h *= m
	h ^= h >> r

	return h
}
